package com.sweetdream.table.wordcount

import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

/**
 * Title: 词频统计
 * Description: 
 * Date 2020/12/16
 */
object WordCountTable {
  def main(args: Array[String]): Unit = {
    // 1.env
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = BatchTableEnvironment.create(env)

    // 2.source
    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))

    // 3.transformation
    val expr = input.toTable(tEnv)
    val result = expr
      .groupBy($"word")
      .select($"word", $"frequency".sum as "frequency")
      .filter($"frequency" === 2)
      .toDataSet[WC]

    // 4.sink
    result.print()
  }

  case class WC(word: String, frequency: Long)

}
